package com.ns.main;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class MapSideJoinMain extends Configured implements Tool{

    private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);

    public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, Text> {

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   

        private HashMap<String,String> product_info = new HashMap<String, String>();

        private Text outPutKey = new Text();

        private Text outPutValue = new Text();

        private String mapInputStr = null;

        private String mapInputSpit[] = null;

        private String product_secondPart = null;

        /**

         * 此方法在每个task开始之前执行，这里主要用作从DistributedCache

         * 中取到tb_dim_city文件，并将里边记录取出放到内存中。

         */

        @Override

        protected void setup(Context context)

                throws IOException, InterruptedException {

            BufferedReader br = null;

            //获得当前作业的DistributedCache相关文件

            Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
//System.out.println(distributePaths[0].getParent());
            
            String productInfo = null;

            for(Path p : distributePaths){

                if(p.toString().endsWith("tb_product.dat")){

                    //读缓存文件，并放到mem中
//                	System.out.println(pParent.toString());
                	File pParent=new File(p.getParent().toString()).listFiles()[0]; 
                    br = new BufferedReader(new FileReader(pParent.toString() + "\\tb_product.dat"));

                    while(null!=(productInfo=br.readLine())){

                        String[] cityPart = productInfo.split(" ",5);

                        if(cityPart.length ==5){

                            product_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);

                        }

                    }

                }

            }

        }

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   

        /**

         * Map端的实现相当简单，直接判断tb_user_profiles.dat中的

         * cityID是否存在我的map中就ok了，这样就可以实现Map Join了

         */

        @Override

        protected void map(Object key, Text value, Context context)

                throws IOException, InterruptedException {

            //排掉空行

            if(value == null || value.toString().equals("")){

                return;

            }

            mapInputStr = value.toString();

            mapInputSpit = mapInputStr.split(" ",4);

            //过滤非法记录

            if(mapInputSpit.length != 4){

                return;

            }

            //判断链接字段是否在map中存在

            product_secondPart = product_info.get(mapInputSpit[3]);

            if(product_secondPart != null){

                this.outPutKey.set(mapInputSpit[3]);

                this.outPutValue.set(product_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);

                context.write(outPutKey, outPutValue);

            }

        }

    }

    static final String CACHE_PATH = "hdfs://hadoop-master:9000/data/three/input/tb_product.dat";
    static final String INPUT_PATH = "hdfs://hadoop-master:9000/data/three/input/tb_order.dat";

	static final String OUT_PATH = "hdfs://hadoop-master:9000/data/three/output";
    
    @Override

    public int run(String[] args) throws Exception {

            Configuration conf=getConf(); //获得配置文件对象

            DistributedCache.addCacheFile(new Path(CACHE_PATH).toUri(), conf);//为该job添加缓存文件

            Job job=new Job(conf,"MapJoinMR");

            job.setNumReduceTasks(0);

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       

            FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); //设置map输入文件路径

            FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); //设置reduce输出文件路径

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           

            job.setJarByClass(MapSideJoinMain.class);

            job.setMapperClass(LeftOutJoinMapper.class);

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      

            job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式

            job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       

            //设置map的输出key和value类型

            job.setMapOutputKeyClass(Text.class);

                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       

            //设置reduce的输出key和value类型

            job.setOutputKeyClass(Text.class);

            job.setOutputValueClass(Text.class);

            job.waitForCompletion(true);

            return job.isSuccessful()?0:1;

    }

    public static void main(String[] args) throws IOException,

            ClassNotFoundException, InterruptedException {

        try {

            int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);

            System.exit(returnCode);

        } catch (Exception e) {

            // TODO Auto-generated catch block

            logger.error(e.getMessage());

        }

    }

}